{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# K-Means Multi-Node Multi-GPU (MNMG) Demo\n",
    "\n",
    "K-Means multi-Node multi-GPU implementation leverages Dask to spread data and computations across multiple workers. cuML uses One Process Per GPU (OPG) layout, which maps a single Dask worker to each GPU.\n",
    "\n",
    "The main difference between cuML's MNMG implementation of k-means and the single-GPU is that the fit can be performed in parallel for each iteration, sharing only the centroids between iterations. The MNMG version also provides the same scalable k-means++ initialization algorithm as the single-GPU version.\n",
    "\n",
    "Unlike the single-GPU implementation, The MNMG k-means API requires a Dask Dataframe or Array as input. `predict()` and `transform()` return the same type as input. The Dask cuDF Dataframe API is very similar to the Dask DataFrame API, but underlying Dataframes are cuDF, rather than Pandas. Dask cuPy arrays are also available.\n",
    "\n",
    "For information about cuDF, refer to the [cuDF documentation](https://docs.rapids.ai/api/cudf/stable).\n",
    "\n",
    "For additional information on cuML's k-means implementation: \n",
    "https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.cluster.KMeans."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from cuml.dask.cluster.kmeans import KMeans as cuKMeans\n",
    "from cuml.dask.common import to_dask_df\n",
    "from cuml.dask.datasets import make_blobs\n",
    "from cuml.metrics import adjusted_rand_score\n",
    "from dask.distributed import Client, wait\n",
    "from dask_cuda import LocalCUDACluster\n",
    "from dask_ml.cluster import KMeans as skKMeans\n",
    "import cupy as cp"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start Dask Cluster\n",
    "\n",
    "We can use the `LocalCUDACluster` to start a Dask cluster on a single machine with one worker mapped to each GPU. This is called one-process-per-GPU (OPG). "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster = LocalCUDACluster(threads_per_worker=1)\n",
    "client = Client(cluster)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "n_samples = 1000000\n",
    "n_features = 2\n",
    "\n",
    "n_total_partitions = len(list(client.has_what().keys()))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generate Data\n",
    "\n",
    "### Device\n",
    "\n",
    "We can generate a Dask cuPY Array of synthetic data for multiple clusters using `cuml.dask.datasets.make_blobs`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_dca, Y_dca = make_blobs(n_samples, \n",
    "                          n_features,\n",
    "                          centers = 5, \n",
    "                          n_parts = n_total_partitions,\n",
    "                          cluster_std=0.1, \n",
    "                          verbose=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Host\n",
    "\n",
    "We collect the Dask cuPy Array on a single node as a cuPy array. Then we transfer the cuPy array from device to host memory into a Numpy array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "X_cp = X_dca.compute()\n",
    "X_np = cp.asnumpy(X_cp)\n",
    "del X_cp"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Scikit-learn model\n",
    "\n",
    "### Fit and predict\n",
    "\n",
    "Since a scikit-learn equivalent to the multi-node multi-GPU K-means in cuML doesn't exist, we will use Dask-ML's implementation for comparison."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "kmeans_sk = skKMeans(init=\"k-means||\",\n",
    "                     n_clusters=5,\n",
    "                     n_jobs=-1,\n",
    "                     random_state=100)\n",
    "\n",
    "kmeans_sk.fit(X_np)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "labels_sk = kmeans_sk.predict(X_np).compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## cuML Model\n",
    "\n",
    "### Fit and predict"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "kmeans_cuml = cuKMeans(init=\"k-means||\",\n",
    "                       n_clusters=5,\n",
    "                       random_state=100)\n",
    "\n",
    "kmeans_cuml.fit(X_dca)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "labels_cuml = kmeans_cuml.predict(X_dca).compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Compare Results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "score = adjusted_rand_score(labels_sk, labels_cuml)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "passed = score == 1.0\n",
    "print('compare kmeans: cuml vs sklearn labels_ are ' + ('equal' if passed else 'NOT equal'))"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "cuml_dev",
   "language": "python",
   "name": "cuml_dev"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
